最近看了一个类嵌入式通道 EmbeddedChannel,这个类是干什么的呢?
比如说,我是做服务器端开发的,写了几个服务器端的编解码和业务处理类。写完了,要测试吧,怎么测呢?再写个客户端?别急着写客户端,看看 EmbeddedChannel 是否能帮上忙。
本文分为两部分,第一部分对 EmbeddedChannel 类的源码简单分析,第二部分看 EmbeddedChannel 的简单使用。
一、先来看下 EmbeddedChannel 的源码
EmbeddedChannel 继承自 AbstractChannel,有 10 多个成员变量。在 channel 中有两个地址,一个是本地地址 LOCAL_ADDRESS,一个是远程地址 REMOTE_ADDRESS 。在该类中,本地地址和远程地址默认值一样的,都设置为new EmbeddedSocketAddress。
EmbeddedSocketAddress 是 Netty 对 SocketAddress 类的扩展,里面只有一个 toString 方法,返回一个固定的值。
在 54 行,ChannelHandler 业务处理器的数组默认为空 EMPTY_HANDLERS。
在 55 行,EmbeddedChannel 类有一个状态枚举类 State,有打开 OPEN、活跃 ACTIVE、关闭 CLOSED 三种状态。
在 57 行,使用 Netty 的内部日志工厂类 InternalLoggerFactory 根据当前的类名,初始化日志打印实例。
在 59、60 行,有一对 ChannelMetadata 实例化类,当传递参数为 true 时,才允许用户断开连接,调用 connect 方法;当传递参数为 false 时,则不允许用户断开连接。
ChannelMetadata 是一个 final 类,很简单,里面有两个构造函数,两个成员变量。
再回过来继续看 EmbeddedChannel类,在 62 行,初始化了一个嵌入式线程 EmbeddedEventLoop,该类实现了 EventLoop 接口。
在 EmbeddedEventLoop 类中,有一个任务队列 Queue<Runnable> tasks,该任务队列的大小为 2,也就是只允许两个线程存在。
再回过来继续看 EmbeddedChannel类,在 63 行初始化了一个异步非阻塞回调监听器,在操作完成的时候,调用 recordException 方法。在 70 行,再次定义了一个 ChannelMetadata 类型的成员变量。
71 行是 ChannelConfig 配置的成员变量。在 73、74 行分别定义了一个入站和出站的队列。75 行异常变量,76 行状态的成员变量。
81 行,默认的构造函数,设置业务处理通道的初始值为空的 EMPTY_HANDLERS,这个在上面的 54 行初始化。90 行的第二个构造函数,可以设置 ChannelId。99 行则可以传递多个业务处理器。还有其他构造函数,我们就不一一介绍了。
在 192 行的 setup 方法中,把一个或多个业务处理器 ChannelHandler 加入到 ChannelPipeline 中。在 207 行,如果 register 为 true 时,再把当前类注册到线程池,以便获取异步非阻塞的执行结果。
216 行的 register 方法,直接把当前对象注册到 loop中。
226 行,对父类的 newChannelPipeline 方法进行重新,并返回当前类的 EmbeddedChannelPipeline。
241 行,判断是否为打开的状态,如果不是关闭状态那就是打开状态。
246 行,判断是否为活跃的状态,如果等于活跃的状态,那就是活跃的状态。
253 行,返回入站队列。271 行,返回出站队列。如果为 null 时,初始化一下。
289、302行分别对入站读、出站读,并返回所选举的对象。
poll 方法,是从入站或出站的队列中选举一个返回,没有返回 null。
317 行,写入站,可以写入一个或多个入站对象。328 行写完了进行刷新操作,以便于写入到通道里。
338、348行写入通道,写入前先判断通道是打开的,然后才能写入。
360、365行对入站进行刷新。
380行,写出站,可以写入一个或多个出站对象。386 行,创建一个可循环的数组集合,数组大小为写入对象的个数。395 行,写完进行刷新。
在 751 行,定义了一个内部类 EmbeddedUnsafe,它继承 AbstractUnsafe。类中对一些基础参数进行了配置。
在 848 行,定义了一个内部类 EmbeddedChannelPipeline,它继承 DefaultChannelPipeline 类,类中对入站和出站进行了重写。
二、再来看看 EmbeddedChannel 的简单使用
public static void main(String[] argo){
// 定义一个业务处理类 handler1
class Handler1 extends ChannelInboundHandlerAdapter{
private Logger log = LoggerFactory.getLogger(Handler1.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("{}:{}", log.getName(), msg);
super.channelRead(ctx, msg);
}
}
// 再定义一个业务处理类 handler2
class Handler2 extends ChannelInboundHandlerAdapter{
private Logger log = LoggerFactory.getLogger(Handler2.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("{}:{}", log.getName(), msg);
super.channelRead(ctx, msg);
}
}
// 初始化子通道
ChannelInitializer channelInitializer = new ChannelInitializer<EmbeddedChannel>(){
@Override
protected void initChannel(EmbeddedChannel ch) throws Exception {
// 打印日志
ch.pipeline().addLast(new LoggingHandler());
// 对字符串数据进行编解码
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new Handler1());
ch.pipeline().addLast(new Handler2());
}
};
EmbeddedChannel embeddedChannel = new EmbeddedChannel(channelInitializer);
// 测试入站消息,必须以 ByteBuf 的形式写入,入站的数据都是 ByteBuf 形式
String str = "AAAAAAAAAAAA";
ByteBuf buf = Unpooled.buffer(str.getBytes().length);
buf.writeBytes(str.getBytes());
embeddedChannel.writeInbound(buf);
// 出站信息写入一个对象
embeddedChannel.writeInbound("CCCCCCC 非 ByteBuf");
// 测试出站消息
embeddedChannel.writeOutbound("BBBBBBBBBBBBB");
try {
// 睡一会儿,等待消息的返回
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
在上面的示例中,定义了两个业务处理类 handler1 和 handler2,并在 29 行的子通道初始化时,把它们加入了业务链 pipeline,为了方便测试,这里还使用了 LoggingHandler 日志工具打印类。
在 35、36、37、38 行,分别加入了字符串解码类 StringDecoder、字符串编码类 StringEncoder,以及自定义的 handler1 和 handler2。
在 41 行,以 子通道为参数,创建了一个 EmbeddedChannel。
在 46 行,将字符串以 ByteBuf 的形式写入入站通道,以测试字符串解码类。
在 48 行,将字符串字节写入入站通道,以测试字符串解码类。
在 50 行,将字符串写入出站通道,以测试字符串编码类。
在 53 行,主线程休眠 5秒,以防止还没写完通道就挂了。运行结果:
以 ByteBuf 写入和以字符串对象写入,在 LoggingHandler 打印的形式不一样,但是最终都走了 handler1 和 handler2 业务处理类。
经过 Debug 测试发现,使用字符串写入的对象并没有走 StringDecoder 解码类,因为这个解码类接收的是 ByteBuf 类型,所以这里只能使用 ByteBuf 类型写入入站测试数据。这是特别需要注意的地方。
如果在解码的时候,或者在服务器的配置参数里设置了直接缓存或者堆缓存,这里也需要注意一下,需要与之对应。
EmbeddedChannel 的使用很简单吧,如果只是对编解码的测试或者对业务 hanlder的测试,有它就足够了。
网友评论