netty中事件的传播主要包含inBound
事件和outBound
事件
ChannelInboundHandler extends ChannelHandler
ChannelOutboundHandler extends ChannelHandler
首先我们看下ChannelInboundHandler
接口,主要包含以下方法,基本都是用和连接事件
相关的
/**
* channel 注册到NioEventLoop上的回调
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* channel 解除注册到NioEventLoop上的回调
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* channel在激活之后的回调
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* channel失效之后的回调
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* channel在读取数据,或者接收 到链接之后的回调,
* 对于服务端channel,这里的msg是一个链接
* 对于客户端channel,这里的msg是一个ByteBuf
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* 数据读取完成后的一个回调
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
* 用户可以自定义的一些事件
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
* 异常事件的传播
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
而InBoundHandler
有其对应的对应的实现类
class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
接下来基于ChannelInboundHandlerAdapter
讲一下read事件
在pipeline
中的传播流程,这是我们案例的服务端代码
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
/**
添加3个InBoundHandler
*/
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});
// Start the server.
ChannelFuture f = b.bind(8007).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
InBoundHandlerA
的实现如下:
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(this.getClass().getName() + " read msg:" + msg);
super.channelRead(ctx, msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().pipeline().fireChannelRead("hello");
}
}
InBoundHandlerB
,InBoundHandlerC
的实现如下:
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(this.getClass().getName() + " read msg:" + msg);
super.channelRead(ctx, msg);
}
}
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(this.getClass().getName() + " read msg:" + msg);
super.channelRead(ctx, msg);
}
}
启动server
,本地通过telnet
命令telnet localhost 8007
触发后,我们可以看到控制台会输出以下结果:
com.tyust.netty.inbound.InBoundHandlerA read msg:hello
com.tyust.netty.inbound.InBoundHandlerB read msg:hello
com.tyust.netty.inbound.InBoundHandlerC read msg:hello
17:21:48.214 [nioEventLoopGroup-3-1] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message hello that reached at the tail of the pipeline. Please check your pipeline configuration.
从控制台输出的日志可以得知,read
事件在pipeline
中的传播是基于InBoundHandler
在pipeline
中的添加顺序来的.接下来从InBoundHandlerA
的这行代码ctx.channel().pipeline().fireChannelRead("hello")
入手,基于源码分析一下这个中间的执行流程.
代码位置:io.netty.channel.DefaultChannelPipeline#fireChannelRead
data:image/s3,"s3://crabby-images/411bc/411bc9c522396ffc87f08f39a7b69953025da71b" alt=""
可以看出这个事件是在
Head
节点开始传播的。
基于前面的文章,我们都知道,实例中的代码的pipeline
结构是这样的
data:image/s3,"s3://crabby-images/b4e92/b4e927eff21c0d950ce1689029563abb05e6bb83" alt=""
而现在我们在IA这个节点触发了一个read
事件,流程也就是这样的
data:image/s3,"s3://crabby-images/3734a/3734afd5b54cb24f8c673b8aeab4f2437a565f3e" alt=""
根据代码可知,最开始是在head
节点开始传播的,从head
节点触发之后,我们继续跟代码,看到代码后会进入HeadContext
进行处理
data:image/s3,"s3://crabby-images/67412/67412fa2e87701d5d142093a7c864fb031065d6d" alt=""
data:image/s3,"s3://crabby-images/c4316/c4316e2b7ecbd93c9cd484eac371c1263a759ae9" alt=""
代码进入这个位置:io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead
data:image/s3,"s3://crabby-images/02139/021391768fa39bb2b7d982b15b28fceacef5c201" alt=""
data:image/s3,"s3://crabby-images/6f9f5/6f9f5d5615cbecc41fa8a1c9cfc4230cbbc2c8f1" alt=""
这时候会开始去找寻下一个
InboundHandler
,我们看下找寻的逻辑,轮训pipeline
中的Handler
,碰到inbound
的就返回.data:image/s3,"s3://crabby-images/a6938/a69380a2c156619ed51e6e9aec7d72c4069eaff9" alt=""
inbound
和outbound
的标示是在构建context
的时候就定好了的
data:image/s3,"s3://crabby-images/4a555/4a55508bd8e37167139a6ce75c0d23ebc864c789" alt=""
继续跟代码,这时候就得到了InBoundHandlerA
,继续调用其invokeChannelRead
,就进入了我们的InBoundHandlerA#channelRead
方法
data:image/s3,"s3://crabby-images/82cef/82cef45596f240da403832f4250abec0cc9cea24" alt=""
data:image/s3,"s3://crabby-images/2c1a2/2c1a233bc9a4fac4c3416f57d62114718d15d02a" alt=""
同样的,InBoundHandlerB
和InBoundHandlerB
的执行也是一样的思路.
最后,事件会传播到我们的TailContext
节点
data:image/s3,"s3://crabby-images/dd937/dd937be44abc8b8cb874b685b6f7920dcb338483" alt=""
看下我们TailContext
中的read
逻辑,会打印出前面我们控制台中显示的那一段
Discarded inbound message hello that reached at the tail of the pipeline. Please check your pipeline configuration.
的日志,最后msg
进行回收,避免内存泄漏.
data:image/s3,"s3://crabby-images/72c05/72c05bc55d6cfe02f11a964874a9ab2fa8106fd8" alt=""
data:image/s3,"s3://crabby-images/cda2e/cda2e7bfe0ab3c93646e93e2efe34660a9672190" alt=""
好的,我们的InBound
事件的传播就分析到这里,接下来我们看outBound
事件。
看到ChannelOutboundHandler
接口的定义,可以看出基本都是跟IO读写
相关的事件
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
而OutBoundHandler
也有其对应的对应的实现类
class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler
接下来基于ChannelOutboundHandlerAdapter
讲一下write事件
在pipeline
中的传播流程,基于之前的代码,我们改下添加handler
的部分
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
InBoundHandlerA
的实现如下:
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(this.getClass().getName() + " write msg: " + msg);
super.write(ctx, msg, promise);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(() -> ctx.channel().write("hello,world"), 3, TimeUnit.SECONDS);
}
}
InBoundHandlerB
,InBoundHandlerC
的代码实现如下:
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(this.getClass().getName() + " write msg: " + msg);
super.write(ctx, msg, promise);
}
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(this.getClass().getName() + " write msg: " + msg);
super.write(ctx, msg, promise);
}
}
启动server
,本地通过telnet
命令telnet localhost 8007
触发后,我们可以看到控制台会输出以下结果:
com.tyust.netty.outbound.OutBoundHandlerC write msg: hello,world
com.tyust.netty.outbound.OutBoundHandlerB write msg: hello,world
com.tyust.netty.outbound.OutBoundHandlerA write msg: hello,world
从控制台输出的日志可以得知, write
事件在pipeline
中的传播是基于outBoundHandler
在pipeline
中的添加顺序逆向顺序来的。接下来从OutBoundHandlerA
的这行代码ctx.channel().pipeline().fireChannelRead("hello")
入手,基于源码分析一下这个中间的执行流程。
从代码流程来看,可以看出事件传播是从TailContext
开始传播
data:image/s3,"s3://crabby-images/84512/8451211acf218b9a87005937be7ab30a20386b92" alt=""
data:image/s3,"s3://crabby-images/aee05/aee05493a63b8cf080c02e25ac11345de12ca87f" alt=""
接着,会去pipeline
中开始寻找下一个节点OutBoundHandlerC
data:image/s3,"s3://crabby-images/91cf9/91cf98ddf99a54c2540a5f80255effe97a3ef1c0" alt=""
data:image/s3,"s3://crabby-images/6816a/6816aaa8427e9d01e553b46a646867460ed01acf" alt=""
data:image/s3,"s3://crabby-images/8ada8/8ada86b6c97a0de4dd675626e55b0579c01684bb" alt=""
接着代码就会进入OutBoundHandlerC#write
方法中,OutBoundHandlerC
中事件会继续沿着pipeline
往下进行传播,最终会传播到HeadContext
data:image/s3,"s3://crabby-images/3810d/3810d6c8e543bf7edef25355af5ce825a4ec0348" alt=""
流程就是如图所示:
data:image/s3,"s3://crabby-images/18c3e/18c3ecc0a937f0d526fbdf3e53b3762f0df0d5be" alt=""
最后我们看下在HeadContext
中对write事件
的处理,他会调用unsafe
的write
方法,unsafe#write
主要是将数据写会到客户端,这里对unsafe
不做过多的解析,后面我们会详细讲unsafe
。
data:image/s3,"s3://crabby-images/95d5f/95d5f0495b771f33e46562dbafef28493edc5c4c" alt=""
ok,outBound事件
就分析到这里,接下来我们分析异常的传播
修改我们server端的代码变成如下:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
其中InBoundHandlerB
的代码如下,调用channelRead
方法的时候会抛出一个异常:
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
throw new RuntimeException(this.getClass().getName() + " happen error");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(this.getClass().getName() + " exceptionCaught exec ");
ctx.fireExceptionCaught(cause);
}
}
InBoundHandlerA
,InBoundHandlerC
,OutBoundHandlerA
,OutBoundHandlerB
,OutBoundHandlerC
重些exceptionCaught
方法,代码如下:
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(this.getClass().getName() + " exceptionCaught ");
super.exceptionCaught(ctx, cause);
}
启动server
端代码,本地通过telnet
命令telnet localhost 8007
触发后,随便输入字符,我们可以看到控制台会输出以下结果:
com.tyust.netty.exception.InBoundHandlerB exceptionCaught exec
com.tyust.netty.exception.InBoundHandlerC exceptionCaught
com.tyust.netty.exception.OutBoundHandlerA exceptionCaught
com.tyust.netty.exception.OutBoundHandlerB exceptionCaught
com.tyust.netty.exception.OutBoundHandlerC exceptionCaught
21:09:05.814 [nioEventLoopGroup-3-1] WARN io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.RuntimeException: com.tyust.netty.exception.InBoundHandlerB happen error
从日志显示得出,异常是随着handler
的添加顺序进行传播,接下来我们进行断点分析;在调用完InBoundHandlerB#channelRead
方法后,事件会往下一个节点进行传播,但由于出现了异常,代码会进入这个位置io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
,紧接着,他会去找pipeline
中下一个重写了exceptionCaught
的方法
找到了InboundHandlerB
data:image/s3,"s3://crabby-images/ee938/ee938529a3167ff039ecb80ee563182b44acd63b" alt=""
data:image/s3,"s3://crabby-images/10892/10892dcc749c7259810c54c97b3bdff9182d407d" alt=""
data:image/s3,"s3://crabby-images/7b9a8/7b9a8bd15e35fe1ac02bec386d52010db0987533" alt=""
也就出现了我们控制台中显示的com.tyust.netty.exception.InBoundHandlerB exceptionCaught exec
日志输出;
接下来他会继续找下一个重写了exceptionCaught
的方法也就是InBoundHandlerC
,以此类推,最后会执行到TailContext
的exceptionCaught
方法
data:image/s3,"s3://crabby-images/6d2b8/6d2b8f4ad33461f0e7a1a797c6ee312e8350f3dc" alt=""
data:image/s3,"s3://crabby-images/128b2/128b226cfdd3636e0a8964a2e93d3a08179a49c2" alt=""
最后我们看下TailContext
的exceptionCaught
方法,它什么事情都没做,只是把日志进行输出,然后进行一场回收
data:image/s3,"s3://crabby-images/b1656/b16562f8492be4674f26500ae8de13ab6e26c3f4" alt=""
data:image/s3,"s3://crabby-images/1f943/1f94362e7c0659f7616ced9c67e12d0b30c957d0" alt=""
这样其实很不友好,异常是反映我们系统是否出问题最重要的一个因素,我们需要将其捕获进行处理,因此常用的处理流程是调整我们的代码添加一个异常的handler
public class ExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(this.getClass().getName() + " 异常处理,e:" + cause);
}
}
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
ch.pipeline().addLast(new ExceptionHandler());
}
});
好了,我们的事件及异常传播到这里就结束了,留给大家两个问题,大家可以沿着我们上面的分析去解决这两个问题:
- 在
outbound事件
传播中,如果OutBoundHandlerA#handlerAdded
使用的case2中的代码,事件会是怎么样在pipeline
中传播的?
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//case 1
ctx.executor().schedule(() -> ctx.channel().write("hello,world"), 3, TimeUnit.SECONDS);
//case 2
ctx.executor().schedule(() -> ctx.write("hello,world"), 3, TimeUnit.SECONDS);
}
- 同样的,在
inBound事件
传播中,如果InBoundHandlerA#channelActive
方法中调用的是case2中的代码,那事件是如何传播的?
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//case 1
ctx.channel().pipeline().fireChannelRead("hello");
//case 2
ctx.fireChannelRead("hello");
}
网友评论