EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
-------------------------调用bind(PORT)后发生的流程-----------------
跳入到AbstractBootstrap的bind方法
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(InetAddress inetHost, int inetPort) {
return bind(new InetSocketAddress(inetHost, inetPort));
}
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
.....//省略一会分析
}
先看看initAndRegister()方法,这个方法做2件事
1通过反射实例化在ServerBootStrap.channel(NioServerSocketChannel.class)配置的具体Channel的Class,并初始化它channel实例
2.将新创建的Channel注册到分配到的事件执行器上对应的selector,然后发布channel的register事件
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
1. init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
2. ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
先分析init(channel)方法。这是一个多态的方法,Bootstrap和ServerBootstrap有各自的实现。(将自定义行为延后到子类实现,达到了可扩展)
1.将配置在bootstrap上的ChannelOption 设置到Channel对应的ChannelConfig上
2.将配置在bootstrap上的AttributeKey设置到Channel上
3.将配置在bootstrap上的ChannleHandler加到channel的channelPipeline最后(channelPipeline是一个类似链表的结构)
4.增加一个ChannelInitializer到pipeline最后。通过ChannelInitializer增加一个继承自ChannelInboundHandlerAdapter的ServerBootstrapAcceptor实例到pipeline最后。ServerBootstrapAcceptor接收配置在bootstrap上childGroup,childHandler,ChildOptions,ChildAttrs几个参数,用于将accept到的client channel做初始化
void init(Channel channel) throws Exception {
//1.
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
//2.
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//3.
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
//4.
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
再分析ChannelFuture regFuture = group().register(channel)方法,上面传入的eventLoopGroup是NioEventLoopGroup,发现register(channel)进入的是MultithreadEventLoopGroup类的方法,分析一下register(channel)方法。有2个步骤。
1.首先调用了父类MultithreadEventExecutorGroup类的next()方法,该方法是通过一个事件执行器选择器来选择一个事件执行器实例
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public EventExecutor next() {
return chooser.next();
}
代码中可以看出netty对性能的极致追求,提供了2种选择器,选择器的算法都一样,都是基于轮训算法,比如有5台机器,第一次分请求到了第一台机器,第二次到了第二台机器,第三次请求到了第三台请求,以此类推一直到第五台机器,然后第六次又到了第一台机器,这样一个轮流的调用
区别在于如果线程数是2的次幂的时候,采用移位的方式算出下一个(原理是2次幂的数-1后其最高位为0,其余最低位为1,因此能表示通过& 能取0-该数-1的位置)。否则采用取模的方式。
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
..........省略
}
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
//位移方式轮训
return children[childIndex.getAndIncrement() & children.length - 1];
}
}
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
//取模方式轮训
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}
2.,然后调用选择到的SingleThreadEventLoop的register(channel)方法来注册channel,创建了一个DefaultChannelPromise,因为netty里面IO操作都是异步的,这个DefaultChannelPromise代表了注册逻辑的异步结果,实际最终起作用的是通过channel对应的unsafe
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
debug最终进入到AbstractChannel的内部类AbstractUnsafe的register(EventLoop, promise)方法
分析下register0(ChannelPromise promise)方法。
先尝试把注册异步结果设置为不可取消和检查channel还在打开,将eventloop赋值给channel的eventloop属性
调用doRegister()成功将serversocketchannel注册到selector上后,在channel的pipeline上传递ChannelRegistered事件。
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
//首先检查channel未注册过
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
//检查channel是和指定eventloop兼容
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//分配指定eventLoop给channel
AbstractChannel.this.eventLoop = eventLoop;
//判断当前线程与eventLoop绑定的线程是否一致,这是为了避免竞态条件,不需要锁做同步
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
//如果分配给channel的eventLoop对应的线程与当前线程不是同一个线程,则封装成任务投递给eventloop执行
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
跳转到AbstractNioChannel的doRegister();
javaChannel()获取java原生的ServerSocketChannel,然后调register(selector,interOps,Obj)方法将channel注册到selector上
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
顺便说一下,之前的代码分析中,在init(channel)方法中有将一个channelinitializer添加到pipeline的最后,那么这个channelinitializer在哪里生效呢,就在pipeline.fireChannelRegistered()中被调用。因为channelinitializer继承于ChannelInboundHandlerAdapter类,用于拦截inbound事件,当触发ChannelRegistered事件时,channelinitializer的channelRegistered方法拦截执行。
initChannel((C) ctx.channel())方法是Channelinitializer的一个抽象方法,用于扩展不同的初始化实现。所以继承自ChannelInboundHandlerAdapter的ServerBootstrapAcceptor实例到pipeline最后。开始执行accept功能
可以看到,首先执行了子类自定义实现的initChannel((C) ctx.channel())方法,然后在pipeline中删除Channelinitializer自己,所以之前再调用 ctx.fireChannelRegistered();把事件往下传递
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
initChannel((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
接着分析doBind() initAndRegister()方法后的代码.
首先判断注册异步结果是否完成
1.如果已经完成,则直接调用doBind0(regFuture, channel, localAddress, promise);
2.未完成的话通过增加一个监听器的方式,等通知注册完成时调用doBind0(regFuture, channel, localAddress, promise);
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
分析下doBind0()的代码,如果注册异步结果是成功了的,就调用channel的bind()方法。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
channel的bind()方法,实际上是调用pipeline的bind方法
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
pipeline是把bind请求从尾部往前传递。(由此可见oubound请求都是从tail->head)
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
TailContext继承于AbstractChannelHandlerContext,调用的是父类的bind方法
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
private static final String TAIL_NAME = generateName0(TailContext.class);
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
}
....
}
分析AbstractChannelHandlerContext的bind()方法
1.findContextOutbound()方法是找出pipeline链中下一个的outbound的channelHandler对应的context(挂在pipeline链表上的是包装了channelHandler的channelHandlerContext)。分析代码可以看到是往前面迭代的,找到一个outbound标志位为true的context就返回。
2.找到的context从channel中获取到channel注册的那个eventloop,然后判断当前线程与该eventloop对应的线程是否同一个,是则调用context的invokeBind方法。不是的话则包装成一个任务投递到eventloop的任务队列中执行。(可以看到netty对多线程的处理是无锁编程,将对应的操作投递给属于自己的线程执行)
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (!validatePromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
executor.execute(runnable);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}
}
3.最终是执行AbstractChannelHandlerContext的invokeBind()方法
实际是调用context对应的channleHandler的bind()方法。在创建pipeline时会加一个HeadContext的实例到pipeline作为链头,HeadContext实现了ChannelOutboundHandler,从而实现了bind()方法,其实outbound请求——bind请求最后的处理者是HeadContext
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
看HeadContext的bind方法,所以最后做实际事情bind操作的是unsafe的bind方法。这个unsafe来自于channel自身对应的unsafe
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
unsafe.bind(localAddress, promise);
}
进入到AbstractNioChannel的AbstractUnsafe的bind方法
先确保channel还是在open状态,doBind方法在NioServerSocketChannel实现,里面实现就是调用java原生serverSocketChannel的bind方法,然后投递OneTimeTask任务到channel对应的eventloop,任务主要做的事情是开始传递Inbound事件ChannelActive,
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
pipeline的fireChannelActive方法可以看出inbound事件是从head->tail这样传递的,Tail是实现了ChannelInboundHandler接口的,实现的channelRead是释放消息引用。
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
private static final String TAIL_NAME = generateName0(TailContext.class);
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
//Channel状态为active后则触发channel.read()
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
}
channel.read()实际调用是AbstractChannel.read()方法
@Override
public Channel read() {
pipeline.read();
return this;
}
最终调用的是DefaultChannelPipeline的read(),调用了tail的read方法
public ChannelPipeline read() {
tail.read();
return this;
}
oubound请求read从tail->head往前传递
public ChannelHandlerContext read() {
invokedPrevRead = true;
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}
read请求传递到HeadContext的read方法
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
实际处理的是AbstractNioMessageChannel的内部类NioMessageUnsafe
public final void beginRead() {
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
调用的是AbstractNioChannel的doBeginRead(),做的事情其实修改selectionKey感兴趣的操作,增加对accept操作感兴趣
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
//readInterestOp=16=1<<4
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
暂停下分析一下eventLoop.execute(Runnable task)方法。上面可以看到当eventLoop.inEventLoop()不为true时就封装为OneTimeTask投递到eventLoop中执行,这样是为了所有相关操作都在channel对应的事件循环线程上执行,省去了锁同步实现无锁,该方法实际是SingleThreadEventExecutor类的方法
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//首先当前线程是不是与SingleThreadEventExecutor实例绑定的线程是一致的
//如果一致则直接加入到taskQueue任务队列里面去
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
//判断是不是需要wakeup唤醒当前eventloop对应的selector (判断条件:未唤醒&&当前任务需要唤醒)因为有任务了就需要去执行,不能让selector一直阻塞在select上
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
//判断当前线程与eventloop的线程一致而且没唤醒过则唤醒selector
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
如果一致则直接加入到taskQueue任务队列里面去
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (isShutdown()) {
reject();
}
taskQueue.add(task);
}
如果不一致则先启动本eventloop对应的线程,然后在调用添加任务addTask方法添加任务到任务队列
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
thread.start();
}
}
}
那么启动NioEventLoop的线程的run方法在做什么呢?
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
//该线程是事件执行器对应的线程,上面startThread方法调用后会进入到该线程的run方法
//1.首先更新最后执行了多长时间
//2.调用SingleThreadEventExecutor的多态的abstract修饰的run方法,这个方法根据每个子类实现不同
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
taskQueue = newTaskQueue();
}
实际最终启动eventloop时执行的是NioEventLoop的run方法,这个就是reactor线程执行事件轮训和执行任务的主要逻辑。
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
然后Reactor线程的run方法那边一直在循环做select和处理提交的任务
网友评论