开篇
- 本文基于netty-4.1.8.Final版本进行分析,主要是分析Netty Server初始化过程。
- 建议先参考Netty源码分析 - Bootstrap客户端文章。
- 核心点在于能够理解pipeline的串行调用的执行过程。
基本概念
pipeline-
channel、pipeline、context、handler的关系图如上所示,handler由context进行封装,由双向链表的数据结构进行连接。
-
当Channel对象在构造的时候会同时创建一个ChannelPipeline对象,两个对象相互关联,是一对一的关系,ChannelPipeline不会被多个Channel共享。
-
ChannelPipeline对象创建之后会调用它的各种添加handler的方法向链中加入ChannelHandler对象,而在加入ChannelHandler对象的同时,会自动给每个ChannelHandler包装一个ChannelHandlerContext对象。
-
ChannelHandlerContext是ChannelHandler的上下文信息,它使得ChannelHandler可以和ChannelPipeline以及其它的ChannelHandler对象进行交互操作。通过ChannelHandlerContext对象,ChannelHandler可以通知同一个pipeline中的其他ChannelHandler,也可以在运行时动态改变ChannelPipeline中的内容。
输入消息处理
- 当输入消息触发的时候,例如registred,active,read或readComplete等输入的消息触发的时候,会通过Channel调用对应的ChannelPipeline的对应方法来处理,输入消息会首先通过head找到下一个ChanneInbountHandler来处理输入消息,然后逐一传递到下一个ChanneInbountHandler消息,直至到最后一个内置的tail处理器。
输出消息处理
- 当输入消息触发的时候,例如bind,connect,write等输出消息触发的时候,会通过Channel调用对应的ChannelPipeline的对应方法来处理,输入消息会首先通过tail找到下一个ChanneOutbountHandler来处理输入消息,然后逐一传递到下一个ChanneOutbountHandler消息,直至到最后一个内置的head处理器。
pipeline的handler添加过程
- pipeline的hanndler添加过程分为两个阶段,第一个阶段为添加ChannelInitializer对应的handler,在Netty Client/Server在初始化channel的时候会执行;第二阶段为执行ChannelInitializer对象内部方法initChannel()的时候添加对应handler,在注册channel的时候会执行。
DiscardClient例子
public final class DiscardClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
// 绑定handler对象
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(new DiscardClientHandler());
}
});
// Make the connection attempt.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
handler添加过程
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
volatile EventLoopGroup group;
@SuppressWarnings("deprecation")
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile ChannelHandler handler;
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. newChannel创建对应的channel对象
channel = channelFactory.newChannel();
// 2. init真正执行的是Bootstrap的init()方法
init(channel);
} catch (Throwable t) {
// 省略代码
}
// 3. 执行register()动作
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
}
- AbstractBootstrap#initAndRegister执行操作三部曲:创建channel,初始化channel,注册channel。
- channelFactory.newChannel()创建channel。
- init(channel)初始化channel,init(channel)在子类Bootstrap被重写。
- config().group().register(channel)注册channel。
init过程
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// 绑定ChannelInitializer到ChannelPipeline对象当中
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
}
- Bootstrap#init的核心操作在于将Bootstrap的handler添加到channel对应的pipeline当中。
- config.handler()返回的handler是DiscardClient中handler(new ChannelInitializer<SocketChannel>())绑定的handler对象。
- 划重点,在这里将ChannelHandler对象ChannelInitializer添加对应的pipeline当中
public class DefaultChannelPipeline implements ChannelPipeline {
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
// 1.channelHandler包装成DefaultChannelHandlerContext对象
newCtx = newContext(group, filterName(name, handler), handler);
// 2.添加DefaultChannelHandlerContext对象到pipeline当中。
addLast0(newCtx);
// 3.针对未注册的逻辑添加回调函数callHandlerCallbackLater
if (!registered) {
newCtx.setAddPending();
// 添加回调函数callHandlerCallbackLater
callHandlerCallbackLater(newCtx, true);
return this;
}
// 省略代码
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
}
- ChannelHandler添加到pipeline流程包含3个步骤:创建HandlerContext对象;添加HandlerContext对象到pipeline当中;注册callHandlerCallbackLater的回调task。
- 创建HandlerContext对象,newContext(group, filterName(name, handler), handler)。
- 添加HandlerContext对象到pipeline中,addLast0(newCtx)。
- 注册callHandlerCallbackLater的回调task,callHandlerCallbackLater(newCtx, true),入参参数为newCtx(封装channelHandler对象)。
- Channel对象初始化后pipeline的状态如上图所示,增加了ChannelInitializer的这个handler对象。
register过程
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
private void register0(ChannelPromise promise) {
try {
// 根据neverRegistered的标识判断是否执行invokeHandlerAddedIfNeeded。
boolean firstRegistration = neverRegistered;
// 1.执行channel到eventLoop的绑定动作
doRegister();
neverRegistered = false;
registered = true;
// 2.执行ChannelInitializer的初始化动作
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 3.执行pipeline的fireChannelRegistered()方法
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
// 4.第一次注册会执行pipeline.fireChannelActive()的操作。
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}
}
}
- channel的注册过程包含4部曲,分别是绑定channel到eventLoop;执行invokeHandlerAddedIfNeeded动作;执行fireChannelRegistered动作;执行fireChannelActive动作。
- invokeHandlerAddedIfNeeded负责回调init过程中生成的PendingHandlerCallback对象,会执行ChannelInitializer对应的handler的initChannel方法。
- fireChannelRegistered动作触发channel的状态变为ChannelRegistered。
- fireChannelActive动作触发channel的状态变为ChannelActive。
invokeHandlerAddedIfNeeded
public class DefaultChannelPipeline implements ChannelPipeline {
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
// task是PendingHandlerAddedTask
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
// PendingHandlerAddedTask的入参是ChannelInitializer对应的ctx对象
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// ctx.handler()返回的是ChannelInitializer对象
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
}
}
}
- invokeHandlerAddedIfNeeded的执行按照下面的顺序进行执行 invokeHandlerAddedIfNeeded => callHandlerAddedForAllHandlers => PendingHandlerAddedTask#execute => PendingHandlerAddedTask#callHandlerAdded0 => ChannelInitializer#handlerAdded。
- ctx.handler()返回的是ChannelInitializer的handler对象。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
// initChannel会添加新的handler
initChannel((C) ctx.channel());
} catch (Throwable cause) {
} finally {
// 移除ChannelInitializer对应的handler。
remove(ctx);
}
return true;
}
return false;
}
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}
}
- ChannelInitializer#handlerAdded执行对应的initChannel()方法,完成ChannelInitializer的initChannel()方法并添加新的handler。
- remove(ctx)会移除ChannelInitializer对应的handler。
- 执行ChannelInitializer#initChannel添加新handler同时移除ChannelInitializer自身handler的pipeline如上图。
输入消息处理流程
ChannelInboundHandler- channelRegistered 注册事件,channel注册到EventLoop上后调用,例如服务岗启动时,pipeline.fireChannelRegistered();
- channelUnregistered 注销事件,channel从EventLoop上注销后调用,例如关闭连接成功后,pipeline.fireChannelUnregistered();
- channelActive 激活事件,绑定端口成功后调用,pipeline.fireChannelActive();
- channelInactive非激活事件,连接关闭后调用,pipeline.fireChannelInactive();
- channelRead 读事件,channel有数据时调用,pipeline.fireChannelRead();
- channelReadComplete 读完事件,channel读完之后调用,pipeline.fireChannelReadComplete();
- channelWritabilityChanged 可写状态变更事件,当一个Channel的可写的状态发生改变的时候执行,可以保证写的操作不要太快,防止OOM,pipeline.fireChannelWritabilityChanged();
- userEventTriggered 用户事件触发,例如心跳检测,ctx.fireUserEventTriggered(evt);
- exceptionCaught 异常事件说明:我们可以看出,Inbound事件都是由I/O线程触发,用户实现部分关注的事件被动调用。
- ChannelInboundInvoker负责fire上述的各类事件。
pipeline.fireChannelRead()读事件流程
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略相关代码
try {
// 省略相关代码
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
- NioEventLoop关注OP_READ|OP_ACCEPT两类事件,都属于输入事件类型。
- 进入unsafe.read()进入读取操作,unsafe为AbstractNioByteChannel#NioByteUnsafe。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected class NioByteUnsafe extends AbstractNioUnsafe {
public final void read() {
// 省略相关代码
try {
do {
pipeline.fireChannelRead(byteBuf);
} while (allocHandle.continueReading());
} catch (Throwable t) {
} finally {
}
}
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
public final ChannelPipeline fireChannelRead(Object msg) {
// 从pipeline对象当中的head开始执行遍历
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
}
}
}
- AbstractNioByteChannel#NioByteUnsafe的read触发pipeline.fireChannelRead()操作,进入pipeline的事件触发流程,参数head表示从pipeline的head开始遍历。
- AbstractChannelHandlerContext#invokeChannelRead属于静态方法,个人认为是AbstractChannelHandlerContext提供的为ChannelContext执行的入口。
- AbstractChannelHandlerContext#invokeChannelRead方法内部执行next.invokeChannelRead进行了Context的执行域,第一次Context为HeadContext对象。
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// handler()返回Context对象,第一次返回HeadContext对象。
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
}
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
}
- ((ChannelInboundHandler) handler()).channelRead(this, msg)的handler()方法返回的是Context对象本身,第一次表示为HeadContext对象。
- HeadContext#channelRead表示该handler的处理逻辑(只是这里没有任何处理逻辑),然后通过ctx.fireChannelRead(msg)唤醒当前Context对象下的下一个Context对象。
- 唤醒当前Context对象下的下一个Context对象的处理逻辑统一在AbstractChannelHandlerContext当中实现的。
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
public ChannelHandlerContext fireChannelRead(final Object msg) {
// findContextInbound寻找当前Context的下一个Context对象
invokeChannelRead(findContextInbound(), msg);
return this;
}
// findContextInbound寻找当前Context的下一个Context对象
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
// 进入到转为下一个Context的执行逻辑的核心函数
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 下一个待执行的Context对象
next.invokeChannelRead(m);
} else {
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// handler()返回Context对象
// channelRead被重载执行逻辑,需要在实现中再次执行ctx.fireChannelRead(msg);
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
}
- AbstractChannelHandlerContext作为Context的父类,其核心逻辑在于负责寻找下一个Context对象并开始下一个Context对象的执行逻辑。
- AbstractChannelHandlerContext#fireChannelRead负责查找下一个Context对象并通过AbstractChannelHandlerContext#invokeChannelRead实现当前Context到下一个Context执行的交接。
-
AbstractChannelHandlerContext#invokeChannelRead内部执行已经是下一个Context对象的channelRead方法。
-AbstractChannelHandlerContext作为Context的父类,提供了通用的查询Context、执行Context的方法,本质这些逻辑都是在前一个Context对象的方法中执行,然后通过查询下一个Context对象进行切换。
public class LoggingHandler extends ChannelDuplexHandler {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "RECEIVED", msg));
}
ctx.fireChannelRead(msg);
}
}
- 以LoggingHandler为例,channelRead()内部负责执行本职的log功能,同时通过ctx.fireChannelRead(msg)继续触发下一个Handler的操作。
- 所有的Handler操作本职都是方法的串行。
输出消息处理流程
ChannelOutboundHandler- bind 事件,绑定端口。
- close事件,关闭channel。
- connect事件,用于客户端,连接一个远程机器。
- disconnect事件,用于客户端,关闭远程连接。
- deregister事件,用于客户端,在执行断开连接disconnect操作后调用,将channel从EventLoop中注销。
- read事件,用于新接入连接时,注册成功多路复用器上后,修改监听为OP_READ操作位。
- write事件,向通道写数据。
- flush事件,将通道排队的数据刷新到远程机器上。
- ChannelOutboundInvoker负责处理上述各类事件。
DefaultChannelPipeline#write
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
public ChannelFuture write(Object msg) {
// 进入pipeline的处理流程
return pipeline.write(msg);
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
public final ChannelFuture write(Object msg) {
// 从tail开始执行
return tail.write(msg);
}
}
- DefaultChannelPipeline#write从pipeline的tail开始进行访问。
- tail.write()会调用父类AbstractChannelHandlerContext#write开始pipeline的handler的调用。
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
// 负责从tail开始往head进行遍历,查找输出事件的handler
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
// 省略代码
write(msg, false, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 从tail开始遍历查找下一个Context进行操作
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
// 执行Context对象的invokeWrite方法
next.invokeWrite(m, promise);
}
} else {
}
}
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
// 执行handler自己重载的write方法
invokeWrite0(msg, promise);
} else {
// 执行handler自己重载的write方法或者直接使用父类的write方法
write(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
// 参考LoggingHandler为例进行说明
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
}
- AbstractChannelHandlerContext#write通过findContextOutbound()查找tail的前一个输出事件的handler。
- AbstractChannelHandlerContext按照write => invokeWrite => invokeWrite0的顺序进行调用,在invokeWrite0的内部执行实际业务handler的write()方法。
- ((ChannelOutboundHandler) handler())返回实际的handler对象,如LoggingHandler。
- 实际handler对象如LoggingHandler内部会通过ctx.write(msg, promise)重新开始pipeline的handler对象的寻找,完成一次传递调用。
public class LoggingHandler extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "WRITE", msg));
}
ctx.write(msg, promise);
}
}
image.png
- 图片来自 陶章好
参考
Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)
netty源码分析系列——ChannelHandler系列
网友评论