Netty服务端示例:
public class NettyServer {
public static void main(String[] args) throws Exception {
// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(3);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
// 使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workerGroup的SocketChannel设置处理器
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start。。");
// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
// 给cf注册监听器,监听我们关心的事件
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});*/
// 等待服务端监听端口关闭,closeFuture是异步操作
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1.NioEventLoopGroup和NioEventLoop
public NioEventLoopGroup() {
this(0);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
线程数默认是核心数的两倍。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
重点看下newChild()
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
NioEventLoop里面有两个最核心的组件:
- 1)在其父类构造方法SingleThreadEventExecutor()里面初始化了taskQueue,有可能是LinkedBlockingQueue,也有可能是MpscUnboundedArrayQueue或者MpscUnboundedAtomicArrayQueue
- 2)selectorTuple = openSelector();
2.ServerBootstrap
配置参数:
- ServerBootstrap#group(parentGroup, childGroup),设置this.group = parentGroup, this.childGroup = childGroup。
- AbstractBootstrap#channel,设置this.channelFactory
- AbstractBootstrap#option,设置参数
- ServerBootstrap#childHandler(i),设置this.childHandler
2.1 服务端向selector注册ACCEPT事件并绑定端口地址
AbstractBootstrap#bind(int)
- bind(SocketAddress)
- AbstractBootstrap#doBind
1)initAndRegister();
1-1)channel = channelFactory.newChannel(); 这里调用ReflectiveChannelFactory#newChannel,然后会调用传入类的构造方法constructor.newInstance();
NioServerSocketChannel()构造方法;
this(newSocket(DEFAULT_SELECTOR_PROVIDER));其中newSocket()会调用SelectorProvider.provider(). openServerSocketChannel()创建ServerSocketChannel。
pipeline = newChannelPipeline();创建DefaultChannelPipeline。
super(null, channel, SelectionKey.OP_ACCEPT);
this.readInterestOp = readInterestOp;关注ACCEPT事件;
ch.configureBlocking(false);设置为非阻塞模式;
1-2)init(channel); 核心是向pipeline添加了一个ChannelHandler(ChannelInitializer一次性、初始化handler),负责添加一个ServerBootstrapAcceptor handler,添加完后,自己就移除了,ServerBootstrapAcceptor handler: 负责接收客户端连接创建连接后,对连接的初始化工作。
1-3)config().group().register(channel);
MultithreadEventLoopGroup#register()
SingleThreadEventLoop#register()
AbstractChannel.AbstractUnsafe#register
eventLoop.execute()提交了一个register0()任务。
AbstractNioChannel#doRegister,调用selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
pipeline.invokeHandlerAddedIfNeeded();这里会调用上面设置的ChannelInitializer#initChannel方法,移除自己添加ServerBootstrapAcceptor 。(参考DefaultChannelPipeline#addLast()-> callHandlerCallbackLater(newCtx, true)-> PendingHandlerAddedTask -> ChannelInitializer#handlerAdded)
pipeline.fireChannelRegistered();
beginRead(),调用父类AbstractNioChannel#doBeginRead,这里会调用selectionKey.interestOps(interestOps | readInterestOp),也即关注ACCEPT事件。
2)doBind0(regFuture, channel, localAddress, promise);
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
2.2 NioEventLoop#run
来分析一下SingleThreadEventExecutor#execute
- 1)addTask(task);将任务加入到队列中taskQueue.offer(task);
- 2)startThread();
SingleThreadEventExecutor#doStartThread,这里会调用executor.execute()执行Runnable,Runnable的核心如下;
SingleThreadEventExecutor.this.run();
NioEventLoop#run
上面的executor是ThreadPerTaskExecutor,在MultithreadEventExecutorGroup构造方法里面。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
重点就在NioEventLoop#run
- 1)SelectStrategy.SELECT:select(wakenUp.getAndSet(false));这里面核心是调用 int selectedKeys = selector.select(timeoutMillis);
- 2)processSelectedKeys();
- 3)runAllTasks();从taskQueue中取出任务并执行。
无锁串行化设计思想
Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换,这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。
3.pipeline责任链
来看看pipeline责任链调用流程:
- DefaultChannelPipeline#fireChannelRegistered
- AbstractChannelHandlerContext.invokeChannelRegistered(head);
- 调用head.invokeChannelRegistered()
- 调用HeadContext#channelRegistered
- 核心是findContextInbound(MASK_CHANNEL_REGISTERED),找到下一个与MASK_CHANNEL_REGISTERED匹配的调用者,然后又重复调用AbstractChannelHandlerContext.invokeChannelRegistered()方法,实现责任链调用
AbstractChannelHandlerContext.invokeChannelRegistered:
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
AbstractChannelHandlerContext#invokeChannelRegistered()
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
DefaultChannelPipeline.HeadContext#channelRegistered
public void channelRegistered(ChannelHandlerContext ctx) {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
AbstractChannelHandlerContext#fireChannelRegistered
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
核心是AbstractChannelHandlerContext#findContextInbound:
- 从前往后查找AbstractChannelHandlerContext ,直到找到与mask匹配为止
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
4.服务端Channel注册并处理ACCEPT事件
NioEventLoop#processSelectedKeys
private void processSelectedKeys() {
if (selectedKeys != null) {
//不用JDK的selector.selectedKeys(), 性能更好(1%-2%),垃圾回收更少
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
//呼应于channel的register中的this: 例如:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//处理读请求(断开连接)或接入连接
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
重点关注一下服务端处理SelectionKey.OP_ACCEPT请求:unsafe.read();
实际是AbstractNioMessageChannel.NioMessageUnsafe#read
- 1)doReadMessages()首先会调用serverSocketChannel.accept(),然后将其封装成NioSocketChannel。
NioSocketChannel构造方法,会创建DefaultChannelPipeline,会关注SelectionKey.OP_READ事件(赋值给this.readInterestOp),会设置非阻塞模式ch.configureBlocking(false)。 - 2)pipeline.fireChannelRead(readBuf.get(i));这里会触发服务端的pipeline中的handler,核心是ServerBootstrapAcceptor#channelRead()。
2-1)child.pipeline().addLast(childHandler),将netty服务端初始化时写的ChannelInitializer加入到客户端socketChannel的pipeline里面;
2-2)childGroup.register(child).addListener(),跟上面服务端channel处理类似。
A)向workerGroup线程池某个NioEventLoop中的selector注册读事件(是在pipeline.fireChannelActive() -> .DefaultChannelPipeline.HeadContext#read -> AbstractChannel.AbstractUnsafe#beginRead,这里会注册上面channel初始化传入的读事件),NioEventLoop#run死循环监听该事件;
B)ChannelInitializer#initChannel方法,移除自己添加NettyServerHandler。
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
5.客户端Channel处理READ事件
NioEventLoop#processSelectedKey()
-> AbstractNioByteChannel.NioByteUnsafe#read
- 1)byteBuf = allocHandle.allocate(allocator); 分配byteBuf
- 2) allocHandle.lastBytesRead(doReadBytes(byteBuf)); 从channel读取数据;
- 3)pipeline.fireChannelRead(byteBuf),pipeline上执行,业务逻辑的处理就在这个地方
6.直接内存、零拷贝与ByteBuf内存池
在上面分配byteBuf里面,就使用了直接内存:
RecvByteBufAllocator.DelegatingHandle#allocate
public ByteBuf allocate(ByteBufAllocator alloc) {
return delegate.allocate(alloc);
}
DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#allocate
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
AbstractByteBufAllocator#ioBuffer(int)
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe() || isDirectBufferPooled()) {
return directBuffer(initialCapacity);
}
return heapBuffer(initialCapacity);
}
PooledByteBufAllocator#newDirectBuffer
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
使用直接内存的优缺点:
优点:
- 不占用堆内存空间,减少了发生GC的可能
- java虚拟机实现上,本地IO会直接操作直接内存(直接内存=>系统调用=>硬盘/网卡),而非直接内存则需要二次拷贝(堆内存=>直接内存=>系统调用=>硬盘/网卡)
缺点:
- 初始分配较慢
- 没有JVM直接帮助管理内存,容易发生内存溢出。为了避免一直没有FULL GC,最终导致直接内存把物理内存耗完。我们可以指定直接内存的最大值,通过-XX:MaxDirectMemorySize来指定,当达到阈值的时候,调用system.gc来进行一次FULL GC,间接把那些没有被使用的直接内存回收掉。
对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽量重用缓冲区,Netty提供了基于ByteBuf内存池的缓冲区重用机制。需要的时候直接从池子里获取ByteBuf使用即可,使用完毕之后就重新放回到池子里去。
PooledByteBufAllocator#newDirectBuffer
-> PoolArena#allocate()
-> PoolArena.DirectArena#newByteBuf
-> PooledUnsafeDirectByteBuf#newInstance
-> 最终通过RECYCLER内存池获取ByteBuf对象,如果是非内存池实现,则直接创建一个新的ByteBuf对象。
PoolArena#allocate()
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
PoolArena.DirectArena#newByteBuf
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
PooledUnsafeDirectByteBuf#newInstance
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
7.ByteBuf扩容机制
ByteBuf.writeByte()->AbstractByteBuf
AbstractByteBuf#writeByte
public ByteBuf writeByte(int value) {
ensureWritable0(1);
_setByte(writerIndex++, value);
return this;
}
AbstractByteBuf#ensureWritable0
final void ensureWritable0(int minWritableBytes) {
ensureAccessible();
if (minWritableBytes <= writableBytes()) {
return;
}
final int writerIndex = writerIndex();
if (checkBounds) {
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
}
// Normalize the current capacity to the power of 2.
int minNewCapacity = writerIndex + minWritableBytes;
int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
int fastCapacity = writerIndex + maxFastWritableBytes();
// Grow by a smaller amount if it will avoid reallocation
if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
newCapacity = fastCapacity;
}
// Adjust to the new capacity.
capacity(newCapacity);
}
AbstractByteBufAllocator#calculateNewCapacity
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
Netty的ByteBuf需要动态扩容来满足需要,扩容过程: 默认门限阈值为4MB(这个阈值是一个经验值,不同场景,可能取值不同),当需要的容量等于门限阈值,使用阈值作为新的缓存区容量 目标容量,如果大于阈值,采用每次步进4MB的方式进行内存扩张((需要扩容值/4MB)*4MB),扩张后需要和最大内存(maxCapacity)进行比较,大于maxCapacity的话就用maxCapacity,否则使用扩容值 目标容量,如果小于阈值,采用倍增的方式,以64(字节)作为基本数值,每次翻倍增长64 -->128 --> 256,直到倍增后的结果大于或等于需要的容量值。
8.空轮询bug处理
NioEventLoop#select
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
//按scheduled的task时间来计算select timeout时间。
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
if (nextWakeupTime != normalizedDeadlineNanos) {
nextWakeupTime = normalizedDeadlineNanos;
}
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) { //已经有定时task需要执行了,或者超过最长等待时间了
if (selectCnt == 0) {
//非阻塞,没有数据返回0
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
//下面select阻塞中,别人唤醒也可以可以的
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%。
Netty的解决办法:
- 1、对Selector的select操作周期进行统计,每完成一次空的select操作进行一次计数,若在某个周期内连续发生N次空轮询,则触发了epoll死循环bug。
- 2、重建Selector,判断是否是其他线程发起的重建请求,若不是则将原SocketChannel从旧的Selector上去除注册,重新注册到新的Selector上,并将原来的Selector关闭。
Netty解决办法具体步骤:
- 1、先定义当前时间currentTimeNanos。
- 2、接着计算出一个执行最少需要的时间timeoutMillis。
- 3、每次对selectCnt做++操作。
- 4、进行判断,如果执行达到或者超过了最少时间,则seletCnt重置为1(过滤到select超时返回情况)。
- 5、一旦到达SELECTOR_AUTO_REBUILD_THRESHOLD这个阀值,就需要重建selector来解决这个问题。
- 6、这个阀值默认是512。
参考
- 图灵VIP课程,https://vip.tulingxueyuan.cn/
网友评论