本文涉及的知识点包括:
1.Netty中使用的主流编解码框架有哪些;
2.如何不写服务端、客户端就可以对编解码或hanlder进行测试;
3.如何把编解码两个类合成一个类,以及更简便的方法;
使用Netty框架有一段时间了,存在很多疑惑。其中之一的疑惑,可以使用的编解码协议那么多,看得眼花缭乱,到底该使用哪个呢?就是因为多,所以不知道该选择哪个,特别是对于新手而言,这太难了!
Netty中粘包拆包,小试牛刀
Netty整合MessagePack、LengthFieldBasedFrameDecoder解决粘包/拆包问题
Netty整合Protobuf编解码,并解决半包问题
Netty整合JBoss Marshalling编解码
上面是整理的编解码案例,使用的主流编解码框架如下,案例中没有最后一个,Facebook的Thrift也是当今主流编解码框架之一,在这里也列出来吧,有时间可以研究研究。
Google的Protobuf
JBoss的Marshalling
MessagePack
Facebook的Thrift
除了上面的这几种编解码,还有下面这些Netty内置的、开箱即用的处理粘包/半包解码类,可以配合上面的编解码使用;
FixedLengthFrameDecoder
LineBasedFrameDecoder
DelimiterBasedFrameDecoder
LengthFieldBasedFrameDecoder
通过这些类的整理,并查看了相关源代码,可以发现,虽然可以使用的编解码协议很多,整理一下也不外乎两种,一种是对象之间互转,另一种是对象和二进制之间互转。
在Netty中就有这么两对类,ByteToMessageDecoder、MessageToByteEncoder、
MessageToMessageDecoder、MessageToMessageEncoder。
更有意思的是:
FixedLengthFrameDecoder extends ByteToMessageDecoder
LineBasedFrameDecoder extends ByteToMessageDecoder
DelimiterBasedFrameDecoder extends ByteToMessageDecoder
LengthFieldBasedFrameDecoder extends ByteToMessageDecoder
没错,这四种处理粘包/半包的类都继承了ByteToMessageDecoder,只是各自的实现方式不同而已。也就是说,我们可以根据业务需求继承ByteToMessageDecoder个性化编解码类。另外,其他一些编解码的类,也和这两对类脱不了干系。
MarshallingDecoder extends LengthFieldBasedFrameDecoder
ProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder>
有时候,既写编码类又写解码类,还要在使用时写两次,很麻烦,把编解码写在一个类里,又会影响调用时候的性能;Netty提供了一个合成类供大家使用,它就是CombinedChannelDuplexHandler,把编解码写在一起,但是又不影响使用的性能,真是一举两得。
基于Netty框架写个demo不容易,既写服务端还要写客户端,这个棘手的问题Netty框架也替我们考虑了,有一个内置的类可以用,它便是EmbeddedChannel,有了这两个强大的类,接下来可以很方便的写demo了。
下面就写一个二进制转化为String的编解码测试用例,第一个类是编解码合成类;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 二进制和String的相互转换
* @author 程就人生
* @date 2019年12月3日
*/
public class StringBinaryCodec extends CombinedChannelDuplexHandler<ByteToStringDecoder, StringToByteEncoder>{
//将编解码类合在一起
public StringBinaryCodec(){
super(new ByteToStringDecoder(), new StringToByteEncoder());
}
}
/**
* 把二进制解码成String字符串
* @author 程就人生
* @date 2019年12月3日
*/
class ByteToStringDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 标记一下当前的readIndex的位置
in.markReaderIndex();
// 判断包头长度
if (in.readableBytes() < 4) {// 不够包头
return;
}
// 读取传送过来的消息的长度。
int length = in.readInt();
// 长度如果小于0
if (length < 0) {// 非法数据,关闭连接
ctx.close();
}
// 读到的消息体长度如果小于传送过来的消息长度
if (length > in.readableBytes()) {
// 重置读取位置
in.resetReaderIndex();
return;
}
// 直接缓冲,读取字符串
byte[] array = new byte[length];
in.readBytes(array, 0, length);
String str = new String(array,"utf-8");
out.add(str);
System.out.println("入站数据:"+ str);
}
}
/**
* 把字符串编码成二进制
* @author 程就人生
* @date 2019年12月3日
*/
class StringToByteEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
System.out.println("出站数据:"+msg);
// 将对象转换为byte
byte[] bytes = msg.getBytes();
// 读取消息的长度
int length = bytes.length;
// 先将消息长度写入,也就是消息头
out.writeInt(length);
// 消息体中包含我们要发送的数据
out.writeBytes(msg.getBytes());
}
}
第二个是测试用例:
import java.nio.charset.Charset;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.embedded.EmbeddedChannel;
/**
* 测试用例
* @author 程就人生
* @date 2019年12月3日
*/
public class TestCode {
@SuppressWarnings("rawtypes")
@Test
public void test(){
//通道初始化
ChannelInitializer channelInitializer = new ChannelInitializer<EmbeddedChannel>(){
@Override
protected void initChannel(EmbeddedChannel ch) throws Exception {
//编解码合二为一,用一个代替两个
ch.pipeline().addLast(new StringBinaryCodec());
// ch.pipeline().addLast(new ByteToStringDecoder());
// ch.pipeline().addLast(new StringToByteEncoder());
}
};
//内置的测试类
EmbeddedChannel channel = new EmbeddedChannel(channelInitializer);
//入站模拟
String str = "这是入站数据啦";
for(int i=0;i<100;i++){
byte[] bytes = (str+i).getBytes(Charset.forName("utf-8"));
ByteBuf buf = Unpooled.buffer(bytes.length + 4);
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
channel.writeOneInbound(buf);
}
//出站模拟测试
str = "你好呀,这是出站数据啦";
for(int j=0;j<100;j++){
channel.writeAndFlush(str+j);
}
}
}
测试结果:
入站数据:这是入站数据啦0
入站数据:这是入站数据啦1
入站数据:这是入站数据啦2
入站数据:这是入站数据啦3
入站数据:这是入站数据啦4
出站数据:你好呀,这是出站数据啦0
出站数据:你好呀,这是出站数据啦1
出站数据:你好呀,这是出站数据啦2
出站数据:你好呀,这是出站数据啦3
出站数据:你好呀,这是出站数据啦4
肯定有小伙伴说,Netty不是提供了ByteToMessageCodec合成类吗,你怎么还自个儿发明轮子?没错,直接继承ByteToMessageCodec,写起来代码更简洁,下面就继承一下吧;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
/**
* 二进制和String的相互转换
* @author 程就人生
* @date 2019年12月3日
*/
public class StringBinaryCodec2 extends ByteToMessageCodec<String>{
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
System.out.println("出站数据:"+msg);
// 将对象转换为byte
byte[] bytes = msg.getBytes();
// 读取消息的长度
int length = bytes.length;
// 先将消息长度写入,也就是消息头
out.writeInt(length);
// 消息体中包含我们要发送的数据
out.writeBytes(msg.getBytes());
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 标记一下当前的readIndex的位置
in.markReaderIndex();
// 判断包头长度
if (in.readableBytes() < 4) {// 不够包头
return;
}
// 读取传送过来的消息的长度。
int length = in.readInt();
// 长度如果小于0
if (length < 0) {// 非法数据,关闭连接
ctx.close();
}
// 读到的消息体长度如果小于传送过来的消息长度
if (length > in.readableBytes()) {
// 重置读取位置
in.resetReaderIndex();
return;
}
// 直接缓冲,读取字符串
byte[] array = new byte[length];
in.readBytes(array, 0, length);
String str = new String(array,"utf-8");
out.add(str);
System.out.println("入站数据:"+ str);
}
}
测试一下,效果一样的,真是棒极了。
下面,再来写一个对象转对象的编解码类,其中的一个对象需要借助于Google的protobuf进行生成,不会的可以翻看以前的文章。
这里就直接继承Netty提供的MessageToMessageCodec编解码合成类吧,来看看代码;
import java.util.List;
import com.example.demo.test.PersonMsg.person;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
/**
* 对象到对象的编码示例
* @author 程就人生
* @date 2019年12月3日
*/
public class ObjectsCodec extends MessageToMessageCodec<BinaryWebSocketFrame, PersonMsg.person>{
/**
* 出站编码
*/
@Override
protected void encode(ChannelHandlerContext ctx, person msg, List<Object> out) throws Exception {
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(msg.toByteArray());
BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(buf);
out.add(binaryWebSocketFrame);
System.out.print("出站数据:" + msg.toString());
}
/**
* 入站解码
*/
@Override
protected void decode(ChannelHandlerContext ctx, BinaryWebSocketFrame msg, List<Object> out) throws Exception {
PersonMsg.person person = PersonMsg.person.parseFrom(msg.content().array());
System.out.print("入站数据“" + person.toString());
out.add(person);
}
}
最后测试类:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
/**
* 测试用例
* @author 程就人生
* @date 2019年12月3日
*/
public class TestCode2 {
@SuppressWarnings("rawtypes")
@Test
public void test(){
//通道初始化
ChannelInitializer channelInitializer = new ChannelInitializer<EmbeddedChannel>(){
@Override
protected void initChannel(EmbeddedChannel ch) throws Exception {
//编解码合二为一,用一个代替两个
ch.pipeline().addLast(new ObjectsCodec());
}
};
//内置的测试类
EmbeddedChannel channel = new EmbeddedChannel(channelInitializer);
//入站模拟
PersonMsg.person.Builder rb;
for(int i=0;i<100;i++){
rb = PersonMsg.person.newBuilder().setUid(i+"").setUsername("aaaa"+i);
byte[] bytes = rb.build().toByteArray();
ByteBuf buf = Unpooled.buffer(bytes.length);
buf.writeBytes(bytes);
BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(buf);
channel.writeOneInbound(binaryWebSocketFrame);
}
//出站模拟测试
for(int j=0;j<100;j++){
rb = PersonMsg.person.newBuilder().setUid(j+"").setUsername("bbbb"+j);
channel.writeAndFlush(rb.build());
}
}
}
测试结果如下:
入站数据“uid: "0"
username: "aaaa0"
入站数据“uid: "1"
username: "aaaa1"
入站数据“uid: "2"
username: "aaaa2"
入站数据“uid: "3"
username: "aaaa3"
入站数据“uid: "4"
username: "aaaa4"
出站数据:uid: "0"
username: "bbbb0"
出站数据:uid: "1"
username: "bbbb1"
出站数据:uid: "2"
username: "bbbb2"
出站数据:uid: "3"
username: "bbbb3"
出站数据:uid: "4"
username: "bbbb4"
总结
不管是服务器端还是客户端,数据接收不到,那就只有一个原因,服务器端和客户端的编解码不一样
网友评论