pipleline和handler以及pipeline的数据流向
- 先理一下思路。首先我们考虑到之前的文章分析,没创建一个channel就会创建一个pipeline与之对应。每个pipeline会有AbstractChannelHandlerContext属性的tail和head从而组成要给双向链表。那么pipeline的handler添加和数据流向其实都是基于HandlerContext和双向链表的性质。下面具体分析。
当然我们仍然下面这段代码分析,主要分析pipeline的添加
b.group(group)
//初始化工厂ReflectiveChannelFactory为后续链接connect方法创建NioSocketChannel对象
.channel(NioSocketChannel.class)
//将选项添加到AbstractBootstrap属性options. 实现类中Bootstrap的init(Channel channel)方法设置channel的类型
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
pipeline.addXXX 都有一个重载的方法, 例如 addLast, 它有一个重载的版本
直接查看DefaultChannelPipeline源码:
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
我们一路查看下去,找到重载的方法,且记住我们入参里group、和name都是null
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//检查是否重复添加
checkMultiplicity(handler);
//创建DefaultChannelHandlerContext对象
newCtx = newContext(group, filterName(name, handler), handler);
// 将生成的newCtx插入handlercontex链表中
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
从addLast0方法看到,这里是将我们的handler添加到了tail的前面
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
```
* 还有一点就是上面addLast方法中newContext方法的入参filterName(name, handler),这里会生成handler名字并校验是否重复(有兴趣可查看源码类DefaultChannelPipeline中 generateName(ChannelHandler handler))
2、 那么inbond和outbond是决定pipleline的数据流向的关键。
记得我们上面的newContext方法中创建的DefaultChannelHandlerContext里的构造器,有isInbound和isOutbound俩方法分别根据接口来判断Inbound和Outbound
```java
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
一个Inbound事件通常由Inbound handler来处理。一个Inbound handler通常处理在IO线程产生的Inbound数据。Inbound数据通过真实的输入操作如 SocketChannel#read(ByteBuffer)来获取。如果一个inbound事件越过了最上面的inbound handler,该事件将会被抛弃到而不会通知你
一个outbound事件由outbound handler来处理。一个outbound handler通常由outbound流量如写请求产生或者转变的。如果一个outbound事件越过了底部的outbound handler,它将由channel关联的IO线程处理。IO线程通常运行的是真实的输出操作如 SocketChannel#write(byteBuffer).
inbound 事件传播方法:
ChannelHandlerContext#fireChannelRegistered()
ChannelHandlerContext#fireChannelActive()
ChannelHandlerContext#fireChannelRead(Object)
ChannelHandlerContext#fireChannelReadComplete()
ChannelHandlerContext#fireExceptionCaught(Throwable)
ChannelHandlerContext#fireUserEventTriggered(Object)
ChannelHandlerContext#fireChannelWritabilityChanged()
ChannelHandlerContext#fireChannelInactive()
ChannelHandlerContext#fireChannelUnregistered()
outbound事件传播方法:
ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext#write(Object, ChannelPromise)
ChannelHandlerContext#flush()
ChannelHandlerContext#read()
ChannelHandlerContext#disconnect(ChannelPromise)
ChannelHandlerContext#close(ChannelPromise)
ChannelHandlerContext#deregister(ChannelPromise)
如果我们捕获了一个事件, 并且想让这个事件继续传递下去, 那么需要调用 Context 相应的传播方法.
例如:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
上面的例子中, MyInboundHandler 收到了一个 channelActive 事件, 它在处理后, 如果希望将事件继续传播下去, 那么需要接着调用 ctx.fireChannelActive().
Outbound 操作(outbound operations of a channel)
以connect为例
Bootstrap.connect -> Bootstrap.doResolveAndConnect -> Bootstrap.doResolveAndConnect0 ->Bootstrap.doConnect ->AbstractChannel.connect->pipeline.connect->tail.connect-> AbstractChannelHandlerContext.connect
最后我们以AbstractChannelHandlerContext.connect 的源码分析:
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// DefaultChannelPipeline 内的双向链表的 tail 开始, 不断向前寻找第一个 outbound 为 true 的 AbstractChannelHandlerContext, 然后调用它的 invokeConnect
final AbstractChannelHandlerContext next = findContextOutbound();
//next其实是找到headContext unsafe.connect(remoteAddress, localAddress, promise);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
当我们找到了一个 outbound 的 Context 后, 就调用它的 invokeConnect 方法,
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
invokeConnect这个方法中会调用 Context 所关联着的 ChannelHandler 的 connect 方法。下面的handler()方法会返回一个handlerContex(根据上面next方法我们知道这里返回的为tailContext,但是tailContext并没有实现connect方法,所以这里的connect为其父类AbstractChannelHandlerContext的connect方法。也就是说再次从上面哪个方法开始,知道执行到headContext时,它实现了connect方法如下 方法三)。然后调用connect,包装handler所以即为hanler的connect。
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
//pipeline.channel().unsafe();
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
方法三 headContext类找的connect方法
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
所以最终是unsafe.connect,而这unsafe的由来我们前面也分析过,看HeadContext的构造器unsafe = pipeline.channel().unsafe(); 所以它是来自channel。那么channel来自哪呢
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, true, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
还记得 .channel(NioSocketChannel.class)这里就是channel的来源
接下来我们找到unsafe是哪里创建的,查看NioSocketChannel构造器
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
构造器跟踪流程:NioSocketChannel->AbstractNioByteChannel->AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
同时newUnsafe() 并没有再AbstractChannel实现,而是在NioSocketChannel实现,这是为什么呢?
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
其实在子类实现,是由于不同的协议用的Unsafe会不同,所以要根据子类区别对待
继续跟踪NioSocketChannelUnsafe但是该类并未实现connect方法,所以查找父类直到找到
AbstractNioUnsafe中的connect方法
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
//省略
//doConnect这里的实现在子类中
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
//省略
}
这段代码的重点就在doConnect,而这个方法在该类中是没有实现的,实现类在子类NioSocketChannel中
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
注意:上面这个方法就是和java的NIO联系的地方了
重点分析:
1、首先doBind0方法 使用SocketUtils.bind(javaChannel(), localAddress);
其中的javaChannel()根据java版本选择nio还是bio
private void doBind0(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else {
SocketUtils.bind(javaChannel().socket(), localAddress);
}
}
在NIO中javaChannel其实获取SelectableChannel,这里SelectableChannel在之前介绍过。所以bind方法
最后调用socketChannel.bind(address); nio的在前面已介绍这里不再赘述。至此connect方法就追踪到这里。
总结connect事件在outbound中的顺序,结合上面Bootstrap.connect最后到达handler的connect就形成了下面这个循环
Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect
直到head中的connect,这里我们上面分析过了。所以从connect事件来管中窥豹的话。就借用官网的数据流程图吧
参考官网的事件流转图
I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
- 由于篇幅过长下篇继续讲解Inbound事件的源码
网友评论