1. boss event loop
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
- 关心SelectionKey.OP_READ | SelectionKey.OP_ACCEPT
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 校验有效性
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
// 关闭连接
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// SelectionKey.OP_READ | SelectionKey.OP_ACCEPT
// 连接或者读取
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
2. NioMessageUnsafe
- 获得客户端通信SocketChannel, 使用NioSocketChannel包装
- 执行handler 链channelRead方法
- 执行handler 链 channelReadComplete方法
handler 链: HeadContext、ServerBootstrapAcceptor、TailContext
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 读取策略类
// HandlerImpml extends MaxMessageHandle
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 重置读取信息
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// NioServerSocketChannel 实现
// 获得客户端通信SocketChannel, 使用NioSocketChannel包装
// 添加到readBuf中(ArrayList<NioSocketChannel>())
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// message count + 1
allocHandle.incMessagesRead(localRead);
}
// 默认false
while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 执行handler 链channelRead方法
// NioSocketChannel = readBuf.get(i)
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 执行handler 链 channelReadComplete方法
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
2.1 HeadContext.channelRead
继续下一个handler
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
2.2 ServerBootstrapAcceptor.channelRead
初始化
不继续执行TailContext.channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 添加ChannelInitializer handler
// 在NioSocketChannel register后添加HandshakeHandler、NettyEncoder、NettyDecoder、IdleStateHandler
// NettyConnectManageHandler、NettyServerHandler
// .childHandler(new ChannelInitializer<SocketChannel>() {
// @Override
// public void initChannel(SocketChannel ch) {
// ch.pipeline()
// .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
// new HandshakeHandler(TlsSystemConfig.tlsMode))
// .addLast(defaultEventExecutorGroup,
// new NettyEncoder(),
// new NettyDecoder(),
// new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
// new NettyConnectManageHandler(),
// new NettyServerHandler()
// );
// }
// }
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
- child为NioSocketChannel,后续的操作都是在此类中进行
3. AbstractUnsafe.register
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
// promise 是否被取消
// java channel是否是open状态
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 注册标记
boolean firstRegistration = neverRegistered;
// javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 执行DefaultChannelPipeline中
// PendingHandlerCallback包装的ChannelHander的handlerAdded方法
// 如果ChannelHander extends ChannelInitializer执行handlerAdded
// 后会从DefaultChannelPipeline移除此ChannelHander
pipeline.invokeHandlerAddedIfNeeded();
// 设置成功, notify 注册的listener
safeSetSuccess(promise);
// 从head开始执行DefaultChannelPipeline的ChannelHandler链
// 此示例: 执行完handlerAdded后方法后, 仅剩下默认的HeadContext -> TailContext
// 添加ServerBootstrapAcceptor还在任务队列没有执行
// 这两个ChannlHandler, 没有任何操作
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// javaChannel.socket().isBound()
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
- 执行register
- 执行NettyRemotingServer ChannelInitializer, 添加了HandshakeHandler、NettyEncoder、NettyDecoder、
IdleStateHandler、NettyConnectManageHandler、NettyServerHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
image.png
- 因为channel已经register, 所以NettyRemotingServer ChannelInitializer添加的handler执行handlerAdd方法
只有IdleStateHandler在handlerAdded方法中执行了代码
image.png
- channelRegistered也只是IdleStateHandler有执行代码,其他都是next handler继续执行
- channelActive
HeadContext
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
NettyConnectManageHandler
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}
IdleStateHandler单独介绍
其他handler都是ctx.fireChannelUnregistered();
3.2 readIfIsAutoRead()
private void readIfIsAutoRead() {
// 默认为true
if (channel.config().isAutoRead()) {
channel.read();
}
}
channel.read()
public Channel read() {
pipeline.read();
return this;
}
pipeline.read()
public final ChannelPipeline read() {
tail.read();
return this;
}
tail.read(); 查找handler链, ChannelOutboundHandler类型的handler执行read方法
HeadContext.read
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
NioSocketChannel注册读事件, 注册到了worker NioEventLoop的Selector
4. read
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
4.1 NioByteUnsafe.read
public final void read() {
// NioSocketChannelConfig
final ChannelConfig config = config();
// 校验SocketChannel有效性
// (javaChannel().socket().isInputShutdown() || !isActive())
// 并且
// (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config))
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// ByteBuf分配策略(默认PooledByteBufAllocator)
final ByteBufAllocator allocator = config.getAllocator();
// ByteBuf分配辅助类(HandleImpl extends MaxMessageHandle)
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
// 重置初始化信息(统计信息、最大读取信息)
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
// 从SocketChannel中读取数据
// 并增加totalBytesRead大小
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// 判断此次读取到的数据大小
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
// 没有读取到数据, 释放ByteBuf
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 读取记录+1
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
-
ByteBuf默认
image.png -
doReadBytes(byteBuf) 后
image.png
4.1.1 DefaultChannelPipeline.fireChannelRead
- HeadContext没有任何操作直接下一个
-
HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf>
判断第一个字节数据类型, 从handler链中删除自己, fire next
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
// 判断msg是不是泛型执行的类型(ByteBuf)
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
// 执行实现类
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// mark the current position so that we can peek the first byte to determine if the content is starting with
// TLS handshake
msg.markReaderIndex();
byte b = msg.getByte(0);
if (b == HANDSHAKE_MAGIC_CODE) {
switch (tlsMode) {
case DISABLED:
ctx.close();
log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
break;
case PERMISSIVE:
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline to establish SSL connection");
} else {
ctx.close();
log.error("Trying to establish a SSL connection but sslContext is null");
}
break;
default:
log.warn("Unknown TLS mode");
break;
}
} else if (tlsMode == TlsMode.ENFORCING) {
ctx.close();
log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
}
// reset the reader index so that handshake negotiation may proceed as normal.
msg.resetReaderIndex();
try {
// Remove this handler
ctx.pipeline().remove(this);
} catch (NoSuchElementException e) {
log.error("Error while removing HandshakeHandler", e);
}
// Hand over this message to the next .
ctx.fireChannelRead(msg.retain());
}
}
- NettyDecoder extends LengthFieldBasedFrameDecoder 就是解码了
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
- NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand>, 业务处理
网友评论